package drds.data_propagate.store;

import drds.data_propagate.common.AbstractLifeCycle;
import drds.data_propagate.entry.ClientId;
import drds.data_propagate.entry.position.BinLogEventPosition;
import drds.data_propagate.entry.position.Position;
import drds.data_propagate.metadata.MetaDataManager;

import java.util.List;

/**
 * store回收机制
 */
public abstract class AbstractStoreScavenge extends AbstractLifeCycle implements StoreScavenge {

    protected String taskId;
    protected MetaDataManager metaDataManager;
    protected boolean onAck = true;
    protected boolean onFull = false;
    protected boolean onSchedule = false;
    protected String scavengeSchedule = null;

    public void scavenge() {
        Position position = getLatestAckPosition(taskId);
        cleanUntil(position);
    }

    /**
     * 找出该destination中可被清理掉的position位置
     */
    private Position getLatestAckPosition(String destination) {
        List<ClientId> clientIdList = metaDataManager.listAllSubscribeInfo(destination);
        BinLogEventPosition result = null;
        if (clientIdList != null && clientIdList.size() > 0) {
            // 尝试找到一个最小的logPosition
            for (ClientId clientId : clientIdList) {
                BinLogEventPosition binLogEventPosition = (BinLogEventPosition) metaDataManager.getPosition(clientId);
                if (binLogEventPosition == null) {
                    continue;
                }

                if (result == null) {
                    result = binLogEventPosition;
                } else {
                    result = min(result, binLogEventPosition);
                }
            }
        }

        return result;
    }

    /**
     * 找出一个最小的position位置
     */
    private BinLogEventPosition min(BinLogEventPosition position1, BinLogEventPosition position2) {
        if (position1.getSlaveId().equals(position2.getSlaveId())) {
            // 首先根据文件进行比较
            if (position1.getEntryPosition().getBinlogFileName().compareTo(position2.getEntryPosition().getBinlogFileName()) < 0) {
                return position2;
            } else if (position1.getEntryPosition().getBinlogFileName().compareTo(position2.getEntryPosition().getBinlogFileName()) > 0) {
                return position1;
            } else {
                // 根据offest进行比较
                if (position1.getEntryPosition().getBinlogFileOffset() < position2.getEntryPosition().getBinlogFileOffset()) {
                    return position2;
                } else {
                    return position1;
                }
            }
        } else {
            // 不同的主备库，根据时间进行比较
            if (position1.getEntryPosition().getExecuteTimestamp() < position2.getEntryPosition().getExecuteTimestamp()) {
                return position2;
            } else {
                return position1;
            }
        }
    }

    public void setOnAck(boolean onAck) {
        this.onAck = onAck;
    }

    public void setOnFull(boolean onFull) {
        this.onFull = onFull;
    }

    public void setOnSchedule(boolean onSchedule) {
        this.onSchedule = onSchedule;
    }

    public String getScavengeSchedule() {
        return scavengeSchedule;
    }

    public void setScavengeSchedule(String scavengeSchedule) {
        this.scavengeSchedule = scavengeSchedule;
    }

    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }

    public void setMetaDataManager(MetaDataManager metaDataManager) {
        this.metaDataManager = metaDataManager;
    }

}
